package ucloud

import (
	"sync"
	"sync/atomic"
)

var (
	ucloudResource = NewResource()
	DefaultResource Resource = ucloudResource
	ukafkaChan = make(chan *UkafkaCluster, 20000)
)

func init() {
	ucloudResource = NewResource()
}

type Resource interface {
	Registry(string, string)
	Update() error
	List() map[string]*UkafkaCluster
}

type resource struct {
	Inn string
	secret *secret
	zone []*zoneInfo
	UkafkaCluster map[string]*UkafkaCluster
}

func NewResource() *resource{
	return &resource{
		Inn: "ok",
		secret: new(secret),
		zone: nil,
		UkafkaCluster: make(map[string]*UkafkaCluster),
	}
}

func (r *resource) Registry(pri, pub string)  {
	r.secret.pri = pri
	r.secret.pub = pub
}

func Registry(pri, pub string) {
	DefaultResource.Registry(pri, pub)
}

func (r *resource) Update() error {

	auth, e := r.secret.authInfoUpdate()
	if nil != e {
		return e
	}
	r.zone, e = auth.updateZone()

	ukafkaClient := auth.ukafkaClient()
	var wg sync.WaitGroup
	var count = int32(len(r.zone))
	var num = make(chan int32, count+1)
	wg.Add(len(r.zone) + 1)
	go func(wg *sync.WaitGroup){
		defer wg.Done()
		r.updateKafka(ukafkaChan, num)
	}(&wg)
	for _, z := range r.zone {
		go func(wg *sync.WaitGroup, zone *zoneInfo){
			defer func(){
				num <- count
				wg.Done()
			}()
			ukafkaResourceDesc(ukafkaClient, *zone, ukafkaChan)
			atomic.AddInt32(&count, -1)
		}(&wg, z)
	}
	wg.Wait()
	return nil
}

func Update() error{
	e := DefaultResource.Update()
	return  e
}

func (r *resource)List() map[string]*UkafkaCluster {
	return r.UkafkaCluster
}

func List() map[string]*UkafkaCluster {
	return DefaultResource.List()
}

func (r *resource) updateKafka(ch chan *UkafkaCluster, num chan int32) {

	for {
		select {
		case ukafka := <-ch:
			if ukafka == nil {
				return
			}
			found, _ := r.UkafkaCluster[ukafka.ClusterInstanceId]
			if found == nil {
				r.UkafkaCluster[ukafka.ClusterInstanceId] = ukafka
			}
		case n := <- num:
			if n == 0 {
				return
			}
		}
	}
}